{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "349b4601-d5bd-41da-8e19-05d1d6040945",
   "metadata": {},
   "source": [
    "# Воркшоп по Spark и оптимизациям"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "deb83ec6-c6fd-4717-813d-5a438a7920fc",
   "metadata": {},
   "source": [
    "Данные в `retail_data.csv`\n",
    "\n",
    "```json\n",
    "{\n",
    "   \"InvoiceNo\":\"536365\",\n",
    "   \"StockCode\":\"85123A\",\n",
    "   \"Description\":\"WHITE HANGING HEART T-LIGHT HOLDER\",\n",
    "   \"Quantity\":\"6\",\n",
    "   \"InvoiceDate\":\"12/1/2010 8:26\",\n",
    "   \"UnitPrice\":\"2.55\",\n",
    "   \"CustomerID\":\"17850\",\n",
    "   \"Country\":\"United Kingdom\"\n",
    "}\n",
    "```\n",
    "\n",
    "Данные в `customer_data.csv`\n",
    "\n",
    "```json\n",
    "{\n",
    "   \"CustomerID\":\"12346\",\n",
    "   \"Address\":\"Unit 1047 Box 4089\\nDPO AA 57348\",\n",
    "   \"Birthdate\":\"1994-02-20 00:46:27\",\n",
    "   \"Email\":\"cooperalexis@hotmail.com\",\n",
    "   \"Name\":\"Lindsay Cowan\",\n",
    "   \"Username\":\"valenciajennifer\"\n",
    "}\n",
    "```\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a9dd50b7-22e6-4054-be6b-d65b2406bd82",
   "metadata": {},
   "source": [
    "## Задание 0\n",
    "\n",
    "Все логи по умолчанию пишутся в консоль. Чтобы увидеть их в ноутбуке, необходимо выполнить следующие действия:\n",
    " - В консоли докера с `pyspark` выполнить команду `ipython profile create`;\n",
    " - В файле `.ipython/profile_default/ipython_kernel_config.py` раскомментировать строку `c.IPKernelApp.capture_fd_output = True`;\n",
    " - Перезапустить `kernel` в ноутбуке."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "ce99ea0e-9fd6-4b07-82ba-72e7f92617f4",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession, functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "652ae990-ef47-4bc6-a052-cf7febf016cb",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "24/04/21 16:35:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    }
   ],
   "source": [
    "# Создать сессию Spark\n",
    "spark = SparkSession.builder.appName('yp-spark-workshop').master('local[*]') \\\n",
    "  .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "26e80c44-db7d-42af-b202-8c809ac68d9d",
   "metadata": {},
   "source": [
    "## Задание 1: repartition\n",
    "\n",
    "Репартицировать данные `retail_data` по стране."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "b4a27921-89b0-4a6d-956c-51d8384945d4",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_customer = spark.read \\\n",
    "       .format('json') \\\n",
    "       .option('mode', 'FAILFAST') \\\n",
    "       .load('/home/jovyan/customer_data.json')\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "fe7ed0a4-db01-429b-8f26-345986adf5c8",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Address: string (nullable = true)\n",
      " |-- Birthdate: string (nullable = true)\n",
      " |-- Country: string (nullable = true)\n",
      " |-- CustomerID: string (nullable = true)\n",
      " |-- Email: string (nullable = true)\n",
      " |-- Name: string (nullable = true)\n",
      " |-- Username: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_customer.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "09119395-49d2-4ff0-b2a9-a1d9fcf7ee91",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------------------+--------------+----------+--------------------+-----------------+----------------+\n",
      "|             Address|          Birthdate|       Country|CustomerID|               Email|             Name|        Username|\n",
      "+--------------------+-------------------+--------------+----------+--------------------+-----------------+----------------+\n",
      "|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|    Lindsay Cowan|valenciajennifer|\n",
      "|55711 Janet Plaza...|1988-06-21 00:15:34|       Iceland|     12347|timothy78@hotmail...|  Katherine David|      hillrachel|\n",
      "|Unit 2676 Box 935...|1974-11-26 15:30:20|       Finland|     12348| tcrawford@gmail.com|  Leslie Martinez|    serranobrian|\n",
      "|2765 Powers Meado...|1977-05-06 23:57:35|         Italy|     12349|  dustin37@yahoo.com|    Brad Cardenas|   charleshudson|\n",
      "|17677 Mark Crest\\...|1996-09-13 19:14:27|        Norway|     12350|amyholland@yahoo.com|     Natalie Ford| gregoryharrison|\n",
      "|50047 Smith Point...|1969-06-21 03:39:20|        Norway|     12352| vcarter@hotmail.com|      Dana Clarke|          hmyers|\n",
      "|633 Miller Turnpi...|1993-02-25 18:37:29|       Bahrain|     12353|   laura34@yahoo.com|     Gary Nichols|  andrewhamilton|\n",
      "|38456 Rachael Cau...|1993-03-13 12:37:34|         Spain|     12354|   zmelton@gmail.com|       John Parks|      matthewray|\n",
      "|4140 Pamela Hollo...|1972-11-10 12:01:08|       Bahrain|     12355|   scott50@yahoo.com|Jennifer Lawrence|          glopez|\n",
      "|8681 Karen Roads ...|1973-01-13 17:17:26|      Portugal|     12356|josephmacias@hotm...|    James Sanchez|        wesley20|\n",
      "|18637 Jessica Rid...|1989-11-24 17:12:54|   Switzerland|     12357|michael16@hotmail...|     Ashley Lopez|     thomasdavid|\n",
      "|2129 Joel Rapids\\...|1977-06-19 22:35:52|       Austria|     12358|michaelespinoza@g...| Dr. Angela Brown|      patricia44|\n",
      "|86636 Maria Viadu...|1983-09-21 05:22:18|        Cyprus|     12359|  ryanpena@yahoo.com|        John Vega|     nelsonmaria|\n",
      "|1579 Young Trail\\...|1980-10-28 17:25:59|       Austria|     12360|briannafrost@yaho...|     Lauren Clark|   portermichael|\n",
      "|USNS Howard\\nFPO ...|1982-09-01 09:12:57|       Belgium|     12361|virginia36@hotmai...|Jacqueline Haynes|   johnsonshelly|\n",
      "|70092 Adams Prair...|1979-02-03 03:42:47|       Belgium|     12362|   april04@gmail.com|     Brian Flores|    hunterdaniel|\n",
      "|7322 Owens Inlet ...|1974-12-21 13:27:20|   Unspecified|     12363|   omolina@gmail.com|Christopher Gomez|         james75|\n",
      "|86176 Katherine C...|1990-07-17 15:47:12|       Belgium|     12364|barbaraduncan@gma...|     Robert Burns|          eric10|\n",
      "|932 Jeremy Spring...|1981-07-10 00:35:00|        Cyprus|     12365|nicoleanderson@ho...|    Joshua Parker|     millerrenee|\n",
      "|USNV Chavez\\nFPO ...|1989-12-26 00:58:01|       Denmark|     12367|   aaron99@yahoo.com|Christine Douglas|       michael58|\n",
      "+--------------------+-------------------+--------------+----------+--------------------+-----------------+----------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_customer.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "3026ab82-87ef-42bb-8f6b-4a0290c86f74",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_customer.rdd.getNumPartitions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "76e99dc7-1558-4268-9cab-8fc333d7e751",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "newDf = df_customer.repartition(5, 'Country')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "5d2103c4-5a18-4c8a-86b8-d6618daba15b",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "newDf.rdd.getNumPartitions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "0b8dd863-57b2-445e-8a99-d49f87037063",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "newDf.write.format('csv').save('/home/jovyan/partitionedData')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5d190d7f-2f8e-4d76-8a93-f83e55fb4511",
   "metadata": {},
   "source": [
    "## Задание 2: broadcast join\n",
    "\n",
    "Соединить два датафрейма по ключу `CustomerID`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fbfbb3b7-583b-4d82-baca-6db5ba2de1ef",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_retail = spark.read \\\n",
    "       .format('json') \\\n",
    "       .option('mode', 'FAILFAST') \\\n",
    "       .load('/home/jovyan/retail_data.json')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "439002af-8c4d-45f2-9bba-318e2620946b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_retail.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "189ed24c-1c8d-4d95-b1bf-6c54ba9fba57",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_customer.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4f577130-87c9-42f5-9780-7e44344a1b2b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import broadcast"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3c534d7b-b4a6-46ff-aa44-341d9aa8cd33",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_joined = df_retail.join(broadcast(df_customer), 'CustomerID')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2343753b-1a3f-411d-8679-acf74cd639df",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_joined.explain()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1837f627-1292-4879-9249-d3928302564d",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_joined.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2f843284-085e-4a5a-a560-4bf90eeac88f",
   "metadata": {},
   "source": [
    "## Задание 3: кэш"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1c001524-d729-4d15-8a1c-787b936aea13",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_retail.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7d40a765-0c9c-48a0-a2df-565d30bd8f1a",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_retail.head()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
